﻿//简介高效的群集模块
process.setMaxListeners(0);
var library  = require('cosjs.library');
var cluster = require('cluster');
var workers = [];
//添加子进程，handle：function,或者一个脚本路径
exports.fork = function(key,handle){
    var args = Array.prototype.slice.call(arguments,2);
    var config = { "id": 0,"key":key,  "args": args, "handle": handle }
    config['message'] = function(arr) {
        if(!config['id']){
            return ;
        }
        var worker = cluster.workers[config['id']];
        if(worker){
            try {  worker.send(JSON.stringify(arr));  }catch (e){}
        }
    }
    workers.push(config);
}
//启动所有进程,调用start之后不要再执行任何代码
exports.start = function () {
    if (cluster.isMaster) {
        masterStart();
    }
    else if (cluster.isWorker) {
        workerStart();
    }
}



exports.on = exports.subscribe = function(name,callback){
    if (!cluster.isWorker){
        return false;
    }
    name = 'cosjs' + name;
    var key = cluster.worker.key;
    var arr = ['subscribe',key,name];
    var msg = JSON.stringify(arr);
    process.on(name,callback);
    process.send(msg);
}

exports.publish = function(name,message){
    if (!cluster.isWorker){
        return false;
    }
    arguments[0] = 'cosjs' + arguments[0];
    var arr = Array.from(arguments);
    arr.unshift('publish');
    var msg = JSON.stringify(arr);
    process.send(msg);
}

exports.unsubscribe = function(name){
    if (!cluster.isWorker){
        return false;
    }
    arguments[0] = 'cosjs' + arguments[0];
    var arr = Array.from(arguments);
    arr.unshift('unsubscribe');
    var msg = JSON.stringify(arr);
    process.send(msg);
}


//重启当前子进程,在子进程中执行
exports.restart = function() {
    if (!cluster.isWorker){
        return false;
    }
    try {
        var killtimer = setTimeout(function () {
            process.exit();
        }, 2000);
        killtimer.unref();
        cluster.worker.disconnect();
    }
    catch(e){
        console.log('cluster.worker.disconnect false');
    }
}



function workerStart(){
    processMessage();
    var key = process.env["key"] || false;
    if(!key || ! workers[key]){
        return false;
    }
    cluster.worker.key = key;
    var config = workers[key];
    process.title = config['key'];
    var handle = config['handle'];
    var method = null;
    if(typeof handle == 'function' ){
        method = handle;
    }
    else{
        method = require(handle);
    }
    if(typeof method == 'function' ){
        method.apply(null,config['args']);
    }
}

//master启动时
function masterStart() {
    cluster.on('message',message);
    var util = require('util');
    cluster.on('exit', function (worker, code, signal) {
        var id = worker['id'];
        delete(cluster.workers.id);
        var key = worker['key'];
        var config = workers[key]||{};
        util.log(util.format('exit:worker[%d], key:%s, pid:%d, code:%s, signal:%s', id, config['key'], worker.process.pid,code||0, signal ||0));
        workers[key]['id'] = 0;
        forkWorker(key);
    });
    cluster.on('disconnect', function (worker) {
        var id = worker['id'];
        var key = worker['key'];
        var config = workers[key]||{};
        util.log(util.format('disconnect:worker[%d], key:%s, pid:%d', id, config['key'], worker.process.pid));
    });
    cluster.on('listening', function (worker, address) {
        var id = worker['id'];
        var key = worker['key'];
        var config = workers[key]||{};
        util.log(util.format('listening:worker[%d], key:%s, pid:%d, port:%d ', id, config['key'], worker.process.pid, address.port));
    });
    //启动所有进程
    for(var key in workers){
        forkWorker(key);
    }
}

//启动进程
function forkWorker(key){
    var config = workers[key];
    if( !config || config.id >0 ){
        return;
    }
    var worker = cluster.fork({'key':key});
    config['id'] = worker.id;
    worker['key'] = key;
}

function message(data){
    var arr = library.jsonParse(data);
    if( !Array.isArray(arr)){
        return ;
    }
    var type = arr.shift();
    if(type=='publish'){
        publish(arr);
    }
    else if(type=='subscribe'){
        subscribe(arr);
    }
}

function publish(arr){
    process.emit(arr[0],arr);
}

function subscribe(arr){
    var key = arr[0],event = arr[1];
    var config = workers[key];
    if(!config){
        return false;
    }
    process.on(event,config['message']);
}

function processMessage(){
    process.on('message',function(msg){
        var arr = library.jsonParse(msg);
        if(!Array.isArray(arr)){
            return;
        }
        process.emit.apply(process,arr);
    })
}
